<!DOCTYPE HTML>
<html>
<head>
    <meta http-equiv="content-type" content="text/html; charset=utf-8"/>
    <meta name="google-site-verification" content="KEatQX-J4dYY-6J2KU_aP5X8gAJ8wS0lhylI8umX6WA" />
    <meta name="viewport" content="width=device-width,initial-scale=1,minimal-ui">
    <link rel="shortcut icon" href="../images/favicon.ico">
    <link rel="stylesheet" href="../css/code.css" type="text/css"/>
    <link rel="stylesheet" href="../css/bootstrap.css" type="text/css"/>
    <link rel="stylesheet" href="../css/main.css" type="text/css"/>
    <title>编程小梦|Apache Doris Stream Load &amp; Kakfa To Doris</title>
</head>
<body>
<nav class="navbar navbar-default navbar-static-top" style="opacity: .9" role="navigation">
    <div class="container-fluid">
        <div class="navbar-header">
            <button type="button" class="navbar-toggle" data-toggle="collapse"
                    data-target="#bs-example-navbar-collapse-1">
                <span class="sr-only">Toggle navigation</span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
                <span class="icon-bar"></span>
            </button>
            <a class="navbar-brand" href="/">编程小梦</a>
        </div>
        <div class="collapse navbar-collapse" id="bs-example-navbar-collapse-1">
            <ul class="nav navbar-nav navbar-right">
                <li class="active"><a href="/">Blog</a></li>
                
                <li><a href="https://github.com/kangkaisen" target="_blank" rel="nofollow">GitHub</a></li>
                
                
                <li><a href="http://weibo.com/533234148" target="_blank" rel="nofollow">WeiBo</a></li>
                
            </ul>
        </div>
    </div>
</nav>
<div class="row" style="padding-top: 60px">
    <div class="container center-block">
        <div class="col-md-1"></div>
        <div class="col-md-10 col-sm-12">
            <h1> Apache Doris Stream Load &amp; Kakfa To Doris</h1>
            <hr/>
            <p>作者: 康凯森</p>
            <p>日期: 2019-01-01</p>
            <p>分类: <a href="../tag/OLAP.html" target="_blank" >OLAP</a></p>
            <hr/>
            <p>Doris 最新版本已经支持 Stream Load，本文简介下 Doris Stream Load 的核心原理以及我如何在 Doris 外围实现 Exactly Once 的 Kafka To Doris 实时数据同步。</p>
<!-- toc -->
<ul>
<li><a href="#doris-stream-load-原理">Doris Stream Load 原理</a><ul>
<li><a href="#doris-stream-load-流程">Doris Stream Load 流程</a></li>
<li><a href="#doris-stream-load-faq">Doris Stream Load FAQ</a></li>
</ul>
</li>
<li><a href="#kafka-to-doris">Kafka To Doris</a><ul>
<li><a href="#方案设计">方案设计</a></li>
<li><a href="#mysql-meta-schema">Mysql Meta Schema</a></li>
<li><a href="#核心数据结构">核心数据结构</a></li>
<li><a href="#如何保证-exactly-once">如何保证 Exactly Once</a></li>
<li><a href="#正常的数据处理流">正常的数据处理流</a></li>
<li><a href="#初始化过程">初始化过程</a></li>
<li><a href="#实现细节">实现细节</a></li>
</ul>
</li>
<li><a href="#总结">总结</a></li>
</ul>
<!-- toc stop -->
<h2 id="doris-stream-load-原理">Doris Stream Load 原理</h2>
<p><img src="media/15436477918196/doris-stream-load.png" alt="doris-stream-load"></p>
<p>Doris Stream Load 实现的核心思想和 HBase，Druid 比较类似：在内存中实现一个面向写优化的 MemTable(MemStore)，当 MemTable 的大小超过一定阈值时，进行 flush，将 MemTable 转为带有前缀索引的 SSTable（HBase 中是将 MemStore 转为 HFile, Druid 中是将 Map 转为带倒排索引的 Segment）</p>
<p>Doris 新增了 GlobalTransactionMgr 来保证单次 Stream Load 的原子性。</p>
<h3 id="doris-stream-load-流程">Doris Stream Load 流程</h3>
<p>下面我简介下 Doris 一次 Stream Load 的流程：</p>
<ol>
<li>用户发起 Stream Load 的 Http 请求</li>
<li>FE 经过鉴权后将请求转发给 1 台 BE</li>
<li>BE 向 FE 发起 loadTxnBegin 开始事务的请求</li>
<li>FE 的 GlobalTransactionMgr 开启事务，事务状态标记为 PREPARE，向 BE 返回 txnId</li>
<li>BE 向 FE 发起 streamLoadPut 请求</li>
<li>FE 的 StreamLoadPlanner 生成 Stream Load 的 Plan：Stream Load 的 Plan 只有一个 fragment， 包含一个 StreamLoadScanNode 和 一个 OlapTableSink，StreamLoadScanNode 负责摄入实时数据，OlapTableSink 负责将实时数据发送到每条数据对应的 Tablet 所在的 BE 节点（每条数据对应哪个 Tablet 是根据 PartitionKey 和 DistributionKey 确定的，每个 Tablet 对应哪些 BE 是在建表或建 Partition 时就确定的）</li>
<li>BE 执行 Stream Load 的 Plan: BrokerScanNode 实时读取流式数据，并将 CSV 格式的数据转为 Doris 的 Tuple 格式，OlapTableSink 将数据发送到对应的 BE 节点。 数据接收节点的 BE 的 TabletsChannel 会调用 DeltaWriter 将数据插入到内存的 MemTable，MemTable 的数据结构是 SkipList，当 MemTable 的数据大小超过 config::write_buffer_size 时，会触发 flush 操作，转为列存。</li>
<li>当 Stream Load 结束时，BE 向 FE 发起 loadTxnCommit 请求</li>
<li>FE commit 事务，事务状态标记为 PREPARE， 发布 PublishVersionTask，更新 Table 元数据。</li>
<li>BE 执行 PublishVersionTask，将刚才 flush 的实时数据移至每个 Tablet 对应的数据目录下，更新每个文件的元数据</li>
<li>FE 等待 PublishVersionTask 执行完成，将事务状态置为 VISIBLE，向 BE 返回 loadTxnCommit 的执行结果</li>
<li>BE 向用户返回此次 Stream Load 的结果，Stream Load 结束。</li>
</ol>
<h3 id="doris-stream-load-faq">Doris Stream Load FAQ</h3>
<ul>
<li>Doris Stream Load 内存中的数据会进行聚合吗？</li>
</ul>
<p>会的，当数据在插入到内存的 SkipList 时，Aggregate 模型 和 Uniq 模型的数据会进行聚合</p>
<ul>
<li>Doris Stream Load 刚插入内存中的数据可查吗？</li>
</ul>
<p>不可查， 和 HBase,Druid 不同，Doris 只有每次 Stream Load 的事务状态变为 VISIBLE 后，数据才可查。Doris 目前并没有实现内存中数据的 Scanner。</p>
<ul>
<li>Doris Stream Load 时 BE 节点挂掉 会怎样？</li>
</ul>
<p>如果是和此次 Stream Load 无关的 BE 节点，肯定不会受影响；</p>
<p>如果是负责数据摄入的 BE 节点挂掉，此次 Stream Load 会失败，事务会标记为 ABORTED；</p>
<p>如果是数据所在的 BE 节点超过半数挂掉，此次 Stream Load 会失败，事务会标记为 ABORTED。</p>
<ul>
<li>Doris Stream Load 时 FE 节点挂掉 会怎样？</li>
</ul>
<p>因为 FE 的 Master 节点是事务管理者，中间会有多次 BE 和 FE 的请求交互，如果 FE 的 Master 节点在 BE 请求时挂掉，此次导入也会失败。FE 的非 Master 节点挂掉应该不会有啥影响。</p>
<ul>
<li>Doris Stream Load 的数据源格式？</li>
</ul>
<p>CSV 格式，默认的列分隔符是 &#39;\t&#39;,行分隔符是 &#39;\n&#39;。列分隔符可以自定义。</p>
<h2 id="kafka-to-doris">Kafka To Doris</h2>
<p>目前 Doris 社区最新版已经提供了<a href="https://github.com/apache/incubator-doris/blob/master/docs/help/Contents/Data%20Manipulation/streaming.md">Stream Load</a> 的能力，可以通过 Http 接口进行流式数据的实时导入，但是还没有对接 Kafka 等数据源。 </p>
<p>而我司用户对 Kafka To Doris 实时导入的需求十分迫切，所以我在 Doris 外围实现了 Kafka To Doris 的 Exactly Once 实时数据同步。</p>
<h3 id="方案设计">方案设计</h3>
<p><img src="media/15436477918196/kafka%20to%20doris.png" alt="kafka to doris"></p>
<p>如上图所示，Kafka To Doris 的功能是由 Doris-Load 这个 Java 进程实现，该进程可以运行在任意可以 ping 通 Kafka 集群和 Doris 集群的节点上。</p>
<p>Doris-Load 进程包含以下 5 类线程：</p>
<ul>
<li><strong>Main Thread</strong>： 配置解析，获取 Doris Column 元数据，获取 Kafka Topic 元数据，修复空洞 Offset 数据，获取消费的 Offset 并 Seek</li>
<li><strong>Consume Thread</strong>： 从 Kafka 持续 Pull 数据，达到 buffer_size 或一定时间后 启动 Send Thread；周期性检查 Kafka 的 Partition 变更</li>
<li><strong>Send Thread</strong>：根据 Doris Column 元数据将 Kafka 数据转换为 CSV 格式，并通过 Http 请求发送到 Doris</li>
<li><strong>Commit Thread</strong>：将 Mysql Meta 表中<strong>已经连续成功 Load 进 Doris 的最大sequence_id</strong>对应记录的 kafka_committed 置为 1，并删除小于此 sequence_id 的所有记录</li>
<li><strong>Add Partition Thread</strong>: 对于分区的 Doris 表，需要周期性新建 Partition</li>
</ul>
<h3 id="mysql-meta-schema">Mysql Meta Schema</h3>
<p><img src="media/15436477918196/kafka-to-doris-mysql-meta.png" alt="kafka-to-doris-mysql-meta"></p>
<h3 id="核心数据结构">核心数据结构</h3>
<ul>
<li><code>OffsetsRecord</code> : 代表 Mysql Meta 中一行记录</li>
<li><code>List&lt;String&gt; recordsBuffer</code> : 待发送的一批 Kafka ConsumerRecord</li>
<li><code>Long curSequenceId</code>:  当前的 SequenceId</li>
<li><code>Long lastCommittedSequenceId</code>: 所有小于等于 lastCommittedSequenceId 的 SequenceId 对应的 Kafka 消息已成功 Load 进 Doris， 也意味着在 Mysql Meta 中所有 SequenceId 小于等于 lastCommittedSequenceId 的记录都可以删除</li>
<li><code>Map&lt;Long, OffsetsRecord&gt; pendingRecords</code>: 等待 Doris response 的 Kafka Record，key 是 SequenceId</li>
<li><code>List&lt;Long&gt; pendingCommittedIds</code>: SequenceId 对应的数据已经成功 Load 进 Doris，等待更新 Mysql Meta 中的 kafka_committed 字段</li>
</ul>
<h3 id="如何保证-exactly-once">如何保证 Exactly Once</h3>
<p>保证 Kafka 数据的 Exactly Once 消费等价于同时保证消费 Kafka 数据不丢且不重。</p>
<p><strong>如何保证消费 Kafka 数据时不丢？</strong></p>
<p>精确记录 Kafka 消费的 Offset，只有在确认 Kafka 消息成功被 Doris 消费时，对应的 Kafka 消息才会在 Mysql Meta 中被标记为 Committed。</p>
<p><strong>如何保证消费 Kafka 数据时不重？</strong></p>
<p>核心是借助 Doris Stream Load 的 Label 机制： Doris 的每个 Stream Load Http 请求可以增加一个 Label 的 Http Header，Doris 可以保证相同 Label 的数据在 7 天（可配置）内只能 Load 一次，重复提交会报错，而且可以根据 Label 查询每个 Load 请求的状态。</p>
<p>所以我们只需要让 Load 请求的 Label（dorisDb_dorisTable_sequence_id）和 Kafka 消息的 offsets 严格对应，就可以保证不会重复消费 Kafka 的消息，<strong>即使当 后发送的 Http 请求成功但是先发送的 Http 请求失败时 也可以保证。</strong></p>
<h3 id="正常的数据处理流">正常的数据处理流</h3>
<ol>
<li>curSequenceId 自增，curOffsetsRecord 初始化。</li>
<li>Consume Thread <ol>
<li>从 Kafka 持续 Pull 数据</li>
<li>将 Kafka record 放入 recordsBuffer</li>
<li>如果 recordsBuffer 达到 buffer_size 或是 达到一定时间，启动 Send Thread</li>
<li>curSequenceId 自增，curOffsetsRecord 和 recordsBuffer 初始化。</li>
</ol>
</li>
<li>Send Thread<ol>
<li>根据 Doris Column 元数据将 Kafka 数据转换为 CSV 格式</li>
<li>更新 curOffsetsRecord 的 startOffsets 和 endOffsets</li>
<li>Mysql Meta 中插入 curOffsetsRecord 记录;将 curSequenceId 和 curOffsetsRecord 插入 pendingRecords</li>
<li>发送 Http Load 请求到 Doris</li>
<li>Mysql Meta 中 更新 doris_loaded 字段 为 1，pendingCommittedIds 中插入当前请求的 SequenceId</li>
</ol>
</li>
<li>Commit Thread<ol>
<li>周期性 check pendingCommittedIds 的 size,如果超出阈值，则再 check lastCommittedSequenceId 之后的连续 60 个 SequenceId 对应的数据是否已成功 Load 进 Doris，如果是，则更新 Mysql Meta 中第 lastCommittedSequenceId + 60 个 OffsetsRecord 的 kafka_committed 为 1，并删除所有 SequenceId 小于 lastCommittedSequenceId + 60 的记录</li>
</ol>
</li>
</ol>
<h3 id="初始化过程">初始化过程</h3>
<ol>
<li>从 Doris 获取 Column 元数据。</li>
<li>从 Kafka 获取 Topic 的 Partition 元数据。</li>
<li>如果是第一次启动，根据配置决定是 seekToBeginning 还是 seekToEnd，启动 Consume Thread，进入正常的数据处理流。否则进入第 4 步。</li>
<li>获取 Mysql Meta 表中所有 doris_loaded 为 0 的 OffsetsRecord，从 Doris 检查每个 OffsetsRecord 的状态，如果成功则更新 Mysql Meta 表，如果失败，则重新从 Kafka 中消费指定 offsets 的 record，并同步发送 Load 请求到 Doris，失败则进程退出，成功则更新 Mysql Meta 表。</li>
<li>从 Mysql Meta 表获取最后一条成功 Load 进 Doris 的 OffsetsRecord，根据 OffsetsRecord 的 endOffsets seek 到下一个 Offset，启动 Consume Thread，进入正常的数据处理流。</li>
</ol>
<h3 id="实现细节">实现细节</h3>
<ul>
<li>如何处理 Kafka Partition 变更</li>
</ul>
<p>定期 check Kafka topic 的 Partition 数量，如果发现有变化，则重新 assign。</p>
<ul>
<li>如何处理 Doris Schema 变更</li>
</ul>
<p>目前需要重启 Doris-Load 进程。
后续可以考虑将 Doris 建模和 Doris-Load 进程的管理集成到一个系统（Doris 管理平台），做到用户在 Schema 变更后，自动重启 Doris-Load 进程。</p>
<ul>
<li>如何处理错误数据</li>
</ul>
<p>根据配置决定是否可以容忍错误数据，如果不可以容忍，则 Doris-Load 进程退出；否则每批数据可以容忍指定百分比的错误数据。</p>
<ul>
<li>如何加速 Json 解析</li>
</ul>
<p>目前 Doris-Load 进程最耗 CPU，最慢的部分就是 Kafka 消息的 Json 解析。 我 Json 解析用的是 Jackson 框架，开始时是直接把 Json 当做一个 map 来解析，后来发现在实际使用中 Doris 的字段数是很有可能小于 Kafka 的字段数的，所以就采用了运行时根据 Doris 的元数据动态生成类，让 Jackson 根据生成的类解析 Json 的方式。这种方式不仅可以提高解析速度，同时还可以更容易的保证导入时列的顺序，进行 Null 值的处理。</p>
<ul>
<li>如何处理 FE 和 BE 重启的 Case</li>
</ul>
<p>由于 Doris 的 Http Load 有 label 机制，所以每次 Http Load 是幂等的，所以我们可以 Catch FE 或 BE 连接失败的异常，并进行重试。</p>
<ul>
<li>如何用 0.10 的 kafka 客户端消费 0.8 的 Kafka 数据</li>
</ul>
<p>让 0.10 的 KafkaConsumer 使用 V0 版本的 API，即可支持 0.8 的 Kafka 集群，同时需要保证每次在 assign 和 seek 时必须包含所有的 partition，否则在 pull 数据时会触发 updateFetchPositions 逻辑，就会报 SchemaException。</p>
<h2 id="总结">总结</h2>
<p>Kafka To Doris 在我司12月6号已上线，目前线上已运行了10多个同步流程，期间也完善了一些异常处理，监控报警。后续会参考 Druid Kafka Indexing Service 启动一个 Service，暴露以下 API： 管理 Doris-Laod 进程的启动，关闭和重启；查看每个 Doris-Load 进程的存活状态，消费状态。</p>

            <hr/>
            <div style="padding: 0; margin: 10px auto; width: 90%; text-align: center">
                <button id="rewardButton" , disable="enable" ,
                        onclick="var qr = document.getElementById('QR'); if (qr.style.display === 'none') {qr.style.display='block';} else {qr.style.display='none'}"
                        ,
                        style="cursor: pointer; border: 0; outline: 0; border-radius: 100%; padding: 0; margin: 0; letter-spacing: normal; text-transform: none; text-indent: 0px; text-shadow: none">
                    <span style="display: inline-block; width: 60px; height: 60px; border-radius: 100%; line-height: 58px; color: #fff; font-size:36px; font-family: 'Palatino Linotype', 'Book Antiqua', Palatino, Helvetica, STKaiti, SimSun, serif; background: rgb(236,96,0)">赞</span>
                </button>
                <div id="QR" style="display: none;">
                    <p><img src="../images/weixin.jpeg" width="200" /></p>
                    <p><img src="../images/zhifubao.jpeg" width="200" /></p>
                </div>

            </div>
            <h3>评论</h3>
            <div id="vcomment"></div>
        </div>
        <div class="col-md-1"></div>
    </div>
</div>

<div class="row" style="padding-top: 60px">
    <div class="container center-block">
        <div class="col-md-1"></div>
        <div class="col-md-10 col-sm-12">
            <div class="ds-thread"
                 data-thread-key=5c2b4ff6df5dd7c720398f76
                 data-title=Apache Doris Stream Load &amp; Kakfa To Doris
                 data-url=kafka-to-doris>
            </div>
        </div>
        <div class="col-md-1"></div>
    </div>
</div>

<div class="footer">
    <a href="https://www.bcmeng.com/" target="_blank"  rel="nofollow">康凯森</a>
</div>

<script src="../js/code.js"></script>
<script>hljs.initHighlightingOnLoad();</script>
<script src="../js/jquery.min.js"></script>
<script src="../js/bootstrap.js"></script>
<script>
    var _hmt = _hmt || [];
    (function() {
        var hm = document.createElement("script");
        hm.src = "https://hm.baidu.com/hm.js?1d198a377ef466190881d1c021155925";
        var s = document.getElementsByTagName("script")[0];
        s.parentNode.insertBefore(hm, s);
    })();
</script>
<script src="../js/av-min.js"></script>
<script src='../js/Valine.min.js'></script>
<script type="text/javascript">
    window.valine = new Valine({
        el: '#vcomment' ,
        verify: true,
        notify: true,
        appId: 'BlLnB0re5OzQVzrgEplAxkyg-gzGzoHsz',
        appKey: 'wUyxSV0U4Vi7oK1EHK6ipErv',
        placeholder: '欢迎评论'
    });
</script>

</body>
</html>